/*
 * Copyright (C) 2016, apexes.net. All rights reserved.
 * 
 *        http://www.apexes.net
 * 
 */
package net.apexes.wsonrpc.client;

import net.apexes.wsonrpc.core.Remote;
import net.apexes.wsonrpc.core.WebSocketSession;
import net.apexes.wsonrpc.core.WsonrpcEndpoint;
import net.apexes.wsonrpc.core.WsonrpcLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executor;

/**
 * 
 * @author <a href="mailto:hedyn@foxmail.com">HeDYn</a>
 *
 */
public class WsonrpcClientImpl extends WsonrpcEndpoint implements WsonrpcClient, WsonrpcClientEndpoint {

    private static final Logger LOG = LoggerFactory.getLogger(WsonrpcClientImpl.class);

    private final WsonrpcClientConfig config;
    private final WsonrpcClientEngine engine;
    private final WsonrpcClientImplConnector implConnector;
    private Runnable readyCallback;
    private Runnable abortCallback;
    private WsonrpcClientStatusListener statusListener;
    private WsonrpcClientMessageListener messageListener;
    private boolean running;

    protected WsonrpcClientImpl(WsonrpcClientConfig config) {
        super(new WsonrpcClientEngine(config));
        this.config = config;
        this.engine = (WsonrpcClientEngine) getWsonrpcEngine();
        int heartbeat = config.getHeartbeatInterval();
        int expireCycle = config.getHeartbeatExpireCycle();
        int min = config.getReconnectIntervalMin();
        int max = config.getReconnectIntervalMax();
        int step = config.getReconnectIntervalStep();
        if (heartbeat <= 0) {
            this.implConnector = null;
        } else {
            this.implConnector = new WsonrpcClientImplConnector(this, config.getPingProvider(),
                    heartbeat, expireCycle, min, max, step);
        }
    }

    private void executeCallback(Runnable runnable) {
        if (runnable != null) {
            Executor executor = config.getExecutor();
            if (executor == null) {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.start();
            } else {
                executor.execute(runnable);
            }
        }
    }

    private void fireConnectError(Throwable throwable) {
        try {
            WsonrpcClientStatusListener listener = statusListener;
            if (listener != null) {
                listener.onConnectError(throwable);
            }
        } catch (Exception e) {
            LOG.warn("fireConnectError error.", e);
        }
    }

    private void fireConnected() {
        try {
            if (implConnector != null) {
                implConnector.onConnected();
            }
            WsonrpcClientStatusListener listener = statusListener;
            if (listener != null) {
                listener.onConnected(this);
            }
            executeCallback(readyCallback);
        } catch (Exception e) {
            LOG.warn("fireConnected error.", e);
        }
    }

    private void fireDisconnected(int code, String reason) {
        try {
            if (implConnector != null) {
                implConnector.onDisconnected();
            }
            WsonrpcClientStatusListener listener = statusListener;
            if (listener != null) {
                listener.onDisconnected(this, code, reason);
            }
            executeCallback(abortCallback);
        } catch (Exception e) {
            LOG.warn("fireDisconnected error. code=" + code + ", reason=" + reason, e);
        }
    }

    private void fireRecvMessage() {
        if (implConnector != null) {
            implConnector.onMessage();
        }
    }

    private void fireRecvPong(byte[] bytes) {
        if (implConnector != null) {
            implConnector.onPong();
        }
        WsonrpcClientMessageListener listener = messageListener;
        if (listener != null) {
            listener.onRecvPong(WsonrpcClientImpl.this, bytes);
        }
    }

    private void fireClose() {
        try {
            WsonrpcClientStatusListener listener = statusListener;
            if (listener != null) {
                listener.onClose(this);
            }
        } catch (Exception e) {
            LOG.warn("fireClose error.", e);
        }
    }

    void fireSentMessage(byte[] bytes) {
        try {
            if (implConnector != null) {
                implConnector.onSentMessage();
            }
            WsonrpcClientMessageListener listener = messageListener;
            if (listener != null) {
                listener.onSentMessage(this, bytes);
            }
        } catch (Exception e) {
            LOG.warn("fireSentMessage error.", e);
        }
    }

    void fireSentPing(byte[] bytes) {
        try {
            if (implConnector != null) {
                implConnector.onSentPing();
            }
            WsonrpcClientMessageListener listener = messageListener;
            if (listener != null) {
                listener.onSentPing(this, bytes);
            }
        } catch (Exception e) {
            LOG.warn("fireSentPing error.", e);
        }
    }

    void connectToServer() throws Exception {
        if (!isConnected()) {
            URI uri = URI.create(config.getUrl());
            try {
                config.getWebsocketConnector().connectToServer(this, uri);
            } catch (Throwable t) {
                fireConnectError(t);
                throw t;
            }
        }
    }

    @Override
    public Remote getRemote() {
        return this;
    }

    @Override
    public void setReadyCallback(Runnable readyCallback) {
        this.readyCallback = readyCallback;
    }

    @Override
    public void setAbortCallback(Runnable abortCallback) {
        this.abortCallback = abortCallback;
    }

    @Override
    public void setStatusListener(WsonrpcClientStatusListener listener) {
        this.statusListener = listener;
    }

    @Override
    public void setMessageListener(WsonrpcClientMessageListener listener) {
        this.messageListener = listener;
    }

    @Override
    public void connect() throws Exception {
        running = true;
        if (implConnector != null) {
            if (!implConnector.isRunning()) {
                implConnector.start();
            }
        } else {
            connectToServer();
        }
    }

    @Override
    public void close() {
        running = false;
        if (implConnector != null) {
            implConnector.stop();
        }
        disconnect();
        fireClose();
    }

    @Override
    public void onOpen(WebSocketSession session) {
        super.openSession(new WsonrpcSessionProxy(session));
        fireConnected();
    }

    @Override
    public void onMessage(final byte[] bytes) {
        fireRecvMessage();
        try {
            engine.handle(getSession(), bytes);
        } catch (Exception ex) {
            onError(ex);
        }
    }

    @Override
    public void onPong(final byte[] bytes) {
        fireRecvPong(bytes);
    }

    @Override
    public void onError(Throwable error) {
        WsonrpcLogger logger = config.getWsonrpcLogger();
        if (logger != null) {
            String sessionId = getSessionId();
            if (sessionId == null) {
                sessionId = getClosedSessonId();
            }
            logger.onError(sessionId, error);
        }
    }

    @Override
    public void onClose(int code, String reason) {
        if (isConnected()) {
            try {
                closeSession();
            } catch (Exception ignored) {
            }
        }
        if (running) {
            fireDisconnected(code, reason);
        }
    }

    /**
     *
     * @author <a href="mailto:hedyn@foxmail.com">HeDYn</a>
     *
     */
    private class WsonrpcSessionProxy implements WebSocketSession {

        private final WebSocketSession session;

        private WsonrpcSessionProxy(WebSocketSession session) {
            this.session = session;
        }

        @Override
        public String getId() {
            return session.getId();
        }

        @Override
        public boolean isOpen() {
            return session.isOpen();
        }

        @Override
        public void sendBinary(byte[] bytes) throws IOException {
            session.sendBinary(bytes);
            fireSentMessage(bytes);
        }

        @Override
        public void ping(byte[] bytes) throws IOException {
            session.ping(bytes);
            fireSentPing(bytes);
        }

        @Override
        public void close() throws IOException {
            session.close();
        }
    }
}
